🕺Kapoeira💃

kapoeira

Presentation

kara
Mehdi Rebiai

Presentation

odile
Johanna Vauchel

Presentation

darmon
François Barbe

Opensource at Lectra

Lectra

lectra 4.0

Opensource

  • Use Foss projects to build our solutions

    • define a Foss policy to manage this usage

  • Use Innersource projects

    • incubator for futur Opensource projects

  • Contributor and creator of Opensource projects

Take Away 🎁

  • Discover a new tool to test your kafka streams

  • Help you in your communication with PO/QA/DEV

  • Tips to use it every day

  • Have a good time (we hope)

📽️ Kapoeira story 🎬

cine

Vector

vector

Enrich and collect data

enrichData

Data pipeline

data pipeline

We are perfect !

perfect

Data is perfect !

pipeline example

NO !

no not

NO !

pipeline example poo

Solution ?

TESTS OUR STREAMS!

How to test ?

fast

Fast and efficient…​

Scala Test Example

 1package com.lectra.kafka.stream.example
 2
 3import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
 4import org.apache.kafka.streams._
 5import org.scalatest.flatspec.AnyFlatSpec
 6import org.scalatest.matchers.should.Matchers
 7import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, GivenWhenThen}
 8
 9import java.io.File
10import java.util.UUID
11
12class KafkaStreamSelectKeyTest extends AnyFlatSpec with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with GivenWhenThen {
13
14  private val stringSerializer = new StringSerializer()
15  private val stringDeserializer = new StringDeserializer()
16
17  private var driver: TopologyTestDriver = _
18  private var inputTopic: TestInputTopic[String, String] = _
19  private var outputTopic: TestOutputTopic[String, String] = _
20
21  private def tempDir: File = {
22    val ioDir = System.getProperty("java.io.tmpdir")
23    val f = new File(ioDir, "kafka-" + UUID.randomUUID().toString)
24    f.mkdirs()
25    f.deleteOnExit()
26    f
27  }
28
29  private def buildTopology(): Topology = {
30    import org.apache.kafka.streams.scala.StreamsBuilder
31    val builder = new StreamsBuilder
32    KafkaStreamSelectKey.topology(builder)
33    builder.build()
34  }
35
36  override def beforeEach(): Unit = {
37    KafkaStreamAvro.config.put(StreamsConfig.STATE_DIR_CONFIG, tempDir.getAbsolutePath)
38    driver = new TopologyTestDriver(buildTopology(), KafkaStreamSelectKey.config)
39    inputTopic = driver.createInputTopic(KafkaStreamSelectKey.topicIn, stringSerializer, stringSerializer)
40    outputTopic = driver.createOutputTopic(KafkaStreamSelectKey.topicChangedKey, stringDeserializer, stringDeserializer)
41  }
42
43  override def afterEach(): Unit = {
44    driver.close()
45  }
46
47
48  "Nominal case for select" should "change the key of records by combining key and value with -" in {
49    val key = "mykey"
50    val value = "myvalue"
51    val key2 = "yourkey"
52    val value2 = "yourvalue"
53
54    inputTopic.pipeInput(key, value)
55    inputTopic.pipeInput(key2, value2)
56    val expectedKey1 = s"$key-$value"
57    val expectedKey2 = s"$key2-$value2"
58
59    outputTopic.getQueueSize shouldBe 2
60    outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey1, value)
61    outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey2, value2)
62
63  }
64
65
66}

Happy 😀…​🤮

content vomi

…​ But it’s a mocked infrastructure

fail

It did not test the integration with the Kafka cluster

…​ And only unit tests

end to end

How to test several streams ?

…​ And it’s not for QA (=👮)

les nuls police

…​ And it’s technical code

simon kara

How to communicate with DEV/PO/QA ?

What do we need ?

test pyramid cesar triangle

Integration tests with a simple syntax

Inspiration : Karate

karate

Our context ≠ HTTP

kafkalogo

Integration with Kafka Streams !

What is Kapoeira?

Cucumber Scala, using specific Gherkin DSL.

cucumber

What is Gherkin ?

Feature: A calculator example

  Scenario: The sum example
    Given a = 2
    And b = 3
    When a + b
    Then result == 5

  Scenario: The mult example
    Given a = 2
    And b = 3
    When a * b
    Then result == 6

What is Cucumber ?

class StepDefinitions {
  var variables = collection.mutable.Map[String,Long]()
  var result: Long = 0

  @Given("^\\s*([a-zA-Z]+)\\s*=\\s*(\\d+)\\s*$")
  def addVariable(name: String, value: Long) : Unit = {
    variables = variables ++ (name , value)
  }

  @When("^\\s*([a-zA-Z]+)\\s*\\+\\s*([a-zA-Z]+)\\s*$")
  def sum(left: String, right: String): Unit = {
    val leftValue = variables.get(left)
    val rightValue = variables.get(right)
    assertNotNull(leftValue, "Unknown variable " + left)
    assertNotNull(rightValue, "Unknown variable " + right)
    result = leftValue + rightValue
  }

  @When("^\\s*([a-zA-Z]+)\\s*\\*\\s*([a-zA-Z]+)\\s*$")
  def mult(left: String, right: String): Unit = {
    val leftValue = variables.get(left)
    val rightValue = variables.get(right)
    assertNotNull(leftValue, "Unknown variable " + left)
    assertNotNull(rightValue, "Unknown variable " + right)
    result = leftValue * rightValue
  }

  @Then("^\\s*result\\s*==\\s*(\\d+)\\s*$")
  def result(expectedResult: Long) = {
    assertEquals(expectedResult, result)
  }
}

How does it work ?

archi

How does it work ?

kapoeira diagram

2020 - Birth of Kapoeira

young chabat
  • Inner Source @Lectra

  • First syntax created with a QA

  • Backend calling Confluent CLI

CLI…​

# Console producer
kafka-console-producer \
  --topic orders \
  --bootstrap-server broker:9092 \

# Console consumer
kafka-console-consumer \
  --topic orders \
  --bootstrap-server broker:9092 \
  --from-beginning

2020 - Custom backend

scala
  • Specific Scala implementation for Kafka Consumer/Producer

  • Better syntax with Gherkin Datatable

2021 - ZIO

zio

  • To improve perfs

  • Add parallel mode

  • Batch to manage jointure in topics

2023 - Open Source

2024 - New features

simon

Thanks to you !

Demo

buger quiz
rapport

User Story n°1

As a 🧑‍🍳, WHEN I send a 🥔 to my robot, THEN I expect to have 🍟

User Story n°2

As a 🧑‍🍳, WHEN I send 🥔🥔🥔 to my robot, THEN I expect to have 🍟🍟🍟

User Story n°3

As a 🧑‍🍳, WHEN I send 🥔 to my robot, with KETCHUP(🍅) header request, THEN I expect to have a 🍟 with KETCHUP(🍅).

User Story n°4

As a 🧑‍🍳, WHEN I send the ingredients (🍞,🍅,🥩,🥔) in dedicated robots, THEN I expect to have a menu (🍔 + 🍟)

Burger factory

burger quiz

🥦 REX 🥃

  • 👐 Big community in Lectra

  • 🤝 Used as acceptance tests, specifications during story grooming

  • ✏️ Easy for QA to enrich existing tests

  • 🔄 Used as end-to-end tests

Advantages 💪

advantages

  • Kafka infra

  • Simple to use

  • Communicate with PO/QA/DEV

  • tests as documentation

  • tests as acceptance for stories

Want to use it ? 👩‍🏭

banco

How to build ? 🔨

docker build -t kapoeira:latest .

How to use ? ⚒️

docker run --rm -ti \
-v <PATH_TO_YOUR_FEATURES_FOLDER>:/features \
-v /var/run/docker.sock:/var/run/docker.sock \
-e KAFKA_BOOTSTRAP_SERVER=<HOST:PORT[,HOST2:PORT2,HOST3:PORT3,...]> \
-e KAFKA_SCHEMA_REGISTRY_URL=<URL> \
-e KAFKA_USER=<XXX> \
-e KAFKA_PASSWORD=<****> \
-e JAAS_AUTHENT=<true (default) | false> \
-e LOGGING_LEVEL=<INFO (default) | ERROR | ...> \
-e THREADS=<8 (default) | ... > \
lectratech/kapoeira

How to contribute ? 💵

TODO

Thank you !

kapoeira

Thanks for your feedback